{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MhoQ0WE77laV"
      },
      "source": [
        "##### Copyright 2020 The TensorFlow Authors."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "_ckMIh7O7s6D"
      },
      "outputs": [],
      "source": [
        "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "# https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "jYysdyb-CaWM"
      },
      "source": [
        "# 분산 입력"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "S5Uhzt6vVIB2"
      },
      "source": [
        "<table class=\"tfo-notebook-buttons\" align=\"left\">\n",
        "  <td><a target=\"_blank\" href=\"https://www.tensorflow.org/tutorials/distribute/input\"><img src=\"https://www.tensorflow.org/images/tf_logo_32px.png\">TensorFlow.org에서 보기</a></td>\n",
        "  <td><a target=\"_blank\" href=\"https://colab.research.google.com/github/tensorflow/docs-l10n/blob/master/site/ko/tutorials/distribute/input.ipynb\"><img src=\"https://www.tensorflow.org/images/colab_logo_32px.png\">Google Colab에서 실행</a></td>\n",
        "  <td><a target=\"_blank\" href=\"https://github.com/tensorflow/docs-l10n/blob/master/site/ko/tutorials/distribute/input.ipynb\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\">GitHub에서 소스 보기</a></td>\n",
        "  <td><a href=\"https://storage.googleapis.com/tensorflow_docs/docs-l10n/site/ko/tutorials/distribute/input.ipynb\"><img src=\"https://www.tensorflow.org/images/download_logo_32px.png\">노트북 다운로드</a></td>\n",
        "</table>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "FbVhjPpzn6BM"
      },
      "source": [
        "[tf.distribute](https://www.tensorflow.org/guide/distributed_training) API는 사용자가 단일 머신에서 여러 머신으로 훈련을 쉽게 확장하는 방법을 제공합니다. 모델을 확장할 때 사용자는 입력을 여러 기기로 분산해야 합니다. `tf.distribute`는 입력을 기기에 자동으로 분산할 수 있는 API를 제공합니다.\n",
        "\n",
        "이 가이드는 `tf.distribute` API를 사용하여 분산 데이터세트 및 반복기를 생성할 수 있는 다양한 방법을 보여줍니다. 또한 다음 주제들을 다룹니다.\n",
        "\n",
        "- `tf.distribute.Strategy.experimental_distribute_dataset` 및 `tf.distribute.Strategy.experimental_distribute_datasets_from_function`의 사용법, 샤딩 및 배치 처리 옵션\n",
        "- 분산 데이터세트를 반복할 수 있는 다양한 방법\n",
        "- `tf.distribute.Strategy.experimental_distribute_dataset` / `tf.distribute.Strategy.experimental_distribute_datasets_from_function` API와 `tf.data` API의 차이점과 사용자가 사용하는 데 있어 발생할 수 있는 제한 사항\n",
        "\n",
        "이 가이드는 Keras API를 사용한 분산 입력 사용법에 대해서는 다루지 않습니다."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MM6W__qraV55"
      },
      "source": [
        "## 분산 데이터세트"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "lNy9GxjSlMKQ"
      },
      "source": [
        "`tf.distribute` API를 사용하여 조정하려면 사용자가 `tf.data.Dataset`을 사용하여 입력을 나타내는 것이 좋습니다. `tf.distribute`는 `tf.data.Dataset`(예: 각 가속기 기기에 데이터 자동 프리페치)에서 효율적으로 작동하도록 만들어졌으며 성능 최적화가 구현에 정기적으로 통합되었습니다. `tf.data.Dataset` 외에 다른 사용에 대한 사용 사례가 있는 경우 나중에 이 가이드의 [섹션](%22tensorinputs%22)을 참고하세요. 비분산 훈련 루프에서 사용자는 먼저 `tf.data.Dataset` 인스턴스를 만든 다음 요소를 반복합니다. 예를 들면 다음과 같습니다.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "pCu2Jj-21AEf"
      },
      "outputs": [],
      "source": [
        "# Import TensorFlow\n",
        "!pip install tf-nightly\n",
        "import tensorflow as tf\n",
        "\n",
        "# Helper libraries\n",
        "import numpy as np\n",
        "import os\n",
        "\n",
        "print(tf.__version__)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "dzLKpmZICaWN"
      },
      "outputs": [],
      "source": [
        "global_batch_size = 16\n",
        "# Create a tf.data.Dataset object.\n",
        "dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)\n",
        "\n",
        "@tf.function\n",
        "def train_step(inputs):\n",
        "  features, labels = inputs\n",
        "  return labels - 0.3 * features\n",
        "\n",
        "# Iterate over the dataset using the for..in construct.\n",
        "for inputs in dataset:\n",
        "  print(train_step(inputs))\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ihrhYDYRrVLH"
      },
      "source": [
        "사용자가 존재하는 코드를 최소한으로 변경하면서 `tf.distribute` 전략을 사용할 수 있도록 `tf.data.Dataset` 인스턴스를 배포하고 분산 데이터세트 객체를 반환하는 두 개의 API가 도입되었습니다. 그런 다음 사용자는 이 분산 데이터세트 인스턴스를 반복하고 이전과 같이 모델을 훈련할 수 있습니다. 이제 두 가지 API `tf.distribute.Strategy.experimental_distribute_dataset` 및 `tf.distribute.Strategy.experimental_distribute_datasets_from_function`를 자세히 살펴보겠습니다."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "4AXoHhrsbdF3"
      },
      "source": [
        "### `tf.distribute.Strategy.experimental_distribute_dataset`"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5mVuLZhbem8d"
      },
      "source": [
        "#### 사용법\n",
        "\n",
        "이 API는 `tf.data.Dataset` 인스턴스를 입력으로 받고 `tf.distribute.DistributedDataset` 인스턴스를 반환합니다. 입력 데이터세트를 전역 배치 크기와 동일한 값으로 배치해야 합니다. 이 전역 배치 크기는 모든 기기에서 1스텝에서 처리하려는 샘플의 수입니다. 이 분산 데이터세트를 Python 방식으로 반복하거나 `iter`를 사용하여 반복기를 작성할 수 있습니다. 반환된 객체는 `tf.data.Dataset` 인스턴스가 아니며 데이터세트를 변환하거나 검사하는 다른 API를 지원하지 않습니다. 다른 복제본에 대해 입력을 샤딩하려는 특정 방법이 없는 경우 권장되는 API입니다.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "F2VeZUWUj5S4"
      },
      "outputs": [],
      "source": [
        "global_batch_size = 16\n",
        "mirrored_strategy = tf.distribute.MirroredStrategy()\n",
        "\n",
        "dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)\n",
        "# Distribute input using the `experimental_distribute_dataset`.\n",
        "dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)\n",
        "# 1 global batch of data fed to the model in 1 step.\n",
        "print(next(iter(dist_dataset)))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "QPceDmRht54F"
      },
      "source": [
        "#### 속성"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "0Qb6nDgxiN_n"
      },
      "source": [
        "##### 배치 처리\n",
        "\n",
        "`tf.distribute`는 입력된 `tf.data.Dataset` 인스턴스를 전역 배치 크기와 동기화된 복제본 수로 나눈 새 배치 크기로 다시 배치합니다. 동기화 중인 복제본의 수는 훈련 중에 그래디언트 올리듀스(allreduce)에 참여하는 기기의 수와 같습니다. 사용자가 분산 반복기에서 `next`를 호출하면 복제본마다 배치 크기의 데이터가 각 복제본에 반환됩니다. 다시 배치된 데이터세트 카디널리티는 항상 여러 복제본의 배수입니다. 다음은 몇 가지 예입니다.\n",
        "\n",
        "- `tf.data.Dataset.range(6).batch(4, drop_remainder=False)`\n",
        "\n",
        "    - 분산 없음:\n",
        "\n",
        "        - 배치 1: [0, 1, 2, 3]\n",
        "        - 배치 2: [4, 5]\n",
        "\n",
        "    - 2개 복제본에서 분산 포함. 마지막 배치 ([4, 5])는 2개의 복제본으로 분할됩니다.\n",
        "\n",
        "        - 배치 1:\n",
        "            - 복제본 1:[0, 1]\n",
        "            - 복제본 2:[2, 3]\n",
        "        - 배치 2:\n",
        "            - 복제본 2: [4]\n",
        "            - 복제본 2: [5]\n",
        "\n",
        "- `tf.data.Dataset.range(4).batch(4)`\n",
        "\n",
        "    - 분산 없음:\n",
        "        - 배치 1: [[0], [1], [2], [3]]\n",
        "    - 5개 복제본에서 분산:\n",
        "        - 배치 1:\n",
        "            - 복제본 1: [0]\n",
        "            - 복제본 2: [1]\n",
        "            - 복제본 3: [2]\n",
        "            - 복제본 4: [3]\n",
        "            - 복제본 5: []\n",
        "\n",
        "- `tf.data.Dataset.range(8).batch(4)`\n",
        "\n",
        "    - 분산 없음:\n",
        "        - 배치 1: [0, 1, 2, 3]\n",
        "        - 배치 2: [4, 5, 6, 7]\n",
        "    - 3개 복제본에서 분산:\n",
        "        - 배치 1:\n",
        "            - 복제본 1: [0, 1]\n",
        "            - 복제본 2: [2, 3]\n",
        "            - 복제본 3: []\n",
        "        - 배치 2:\n",
        "            - 복제본 1: [4, 5]\n",
        "            - 복제본 2: [6, 7]\n",
        "            - 복제본 3: []\n",
        "\n",
        "참고 : 위의 예는 전역 배치가 다른 복제본에서 분할되는 방법만 보여줍니다. 구현에 따라 각 복제본은 변경될 수 있으므로 각 복제본에서 발생 수 있는 실제 값에 의존하지 않는 것이 좋습니다.\n",
        "\n",
        "데이터세트를 다시 일괄 처리하면 복제본 수에 비례하여 공간 복잡성이 선형적으로 증가합니다. 이는 다중 작업자 훈련 사용 사례에서 입력 파이프라인에 OOM 오류가 발생할 수 있음을 의미합니다. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IszBuubdtydp"
      },
      "source": [
        "##### 샤딩\n",
        "\n",
        "`tf.distribute`는 또한 다중 작업자 훈련에서 입력 데이터세트를 자동 샤딩합니다. 작업자의 CPU 기기에 각 데이터세트가 생성됩니다. 작업자 집합에 대한 데이터세트를 자동 샤딩하면 각 작업자에게 전체 데이터세트의 하위 세트가 할당됩니다(올바른 `tf.data.experimental.AutoShardPolicy`가 설정된 경우). 이는 각 스텝에서 겹치지 않는 데이터세트 요소의 전역 배치 크기가 각 작업자에 의해 처리되도록 합니다. 자동 샤딩에는 `tf.data.experimental.DistributeOptions`을 사용해 지정할 수 있는 몇 개의 다양한 옵션이 있습니다."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "jwJtsCQhHK-E"
      },
      "outputs": [],
      "source": [
        "dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)\n",
        "options = tf.data.Options()\n",
        "options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA\n",
        "dataset = dataset.with_options(options)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "J7fj3GskHC8g"
      },
      "source": [
        "`tf.data.experimental.AutoShardPolicy`에 대해서 세 가지 다른 옵션을 설정할 수 있습니다.\n",
        "\n",
        "- AUTO: 기본 옵션으로 FILE별 샤딩 시도가 이루어짐을 의미합니다. 파일 기반 데이터세트가 탐지되지 않으면 FILE별 샤딩 시도가 실패합니다. 그러면 `tf.distribute`가 DATA별 샤딩으로 폴백합니다. 입력 데이터세트가 파일 기반이지만 파일 수가 작업자 수보다 적으면, <code>InvalidArgumentError</code>가 ​​발생합니다. 이 경우, 정책을 명시적으로 <code>AutoShardPolicy.DATA</code>로 설정하거나 파일 수가 작업자 수보다 많도록 입력 소스를 더 작은 파일로 분할합니다.\n",
        "\n",
        "- FILE: 모든 작업자에 대해 입력 파일을 샤딩하려는 경우 사용하는 옵션입니다. 입력 파일의 수가 작업자의 수보다 훨씬 많고 파일의 데이터가 균등하게 분산된 경우, 이 옵션을 사용해야 합니다. 이 옵션의 단점은 파일의 데이터가 균등하게 분산되어 있지 않으면 유휴 작업자가 생긴다는 것입니다. 파일의 수가 작업자의 수보다 적으면 `InvalidArgumentError`가 발생합니다. 이 경우에는 정책을 명시적으로 `AutoShardPolicy.DATA`로 설정합니다. 예를 들어, 각각 1개의 복제본이 있는 두 작업자에 2개의 파일을 배포해 보겠습니다. 파일 1에는 [0, 1, 2, 3, 4, 5]가 포함되고 파일 2에는 [6, 7, 8, 9, 10, 11]이 포함됩니다. 동기화된 총 복제본의 수를 2개로, 전역 배치 크기를 4로 설정합니다.\n",
        "\n",
        "    - 작업자 0:\n",
        "        - 배치 1 =  복제본 1: [0, 1]\n",
        "        - 배치 2 =  복제본 1: [2, 3]\n",
        "        - 배치 3 =  복제본 1: [4]\n",
        "        - 배치 4 =  복제본 1: [5]\n",
        "    - 작업자 1:\n",
        "        - 배치 1 =  복제본 2: [6, 7]\n",
        "        - 배치 2 =  복제본 2: [8, 9]\n",
        "        - 배치 3 =  복제본 2: [10]\n",
        "        - 배치 4 =  복제본 2: [11]\n",
        "\n",
        "- DATA: 모든 작업자에 걸쳐 요소를 자동 샤딩합니다. 각 작업자는 전체 데이터세트를 읽고 할당된 샤드만 처리합니다. 다른 모든 샤드는 삭제됩니다. 이 옵션은 일반적으로 입력 파일의 수가 작업자의 수보다 적고 모든 작업자에서 데이터를 더 잘 샤딩하려는 경우에 사용됩니다. 단점은 각 작업자에서 전체 데이터세트를 읽는다는 것입니다. 예를 들어, 두 작업자에 1개의 파일을 배포해 보겠습니다. 파일 1에 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]이 포함됩니다. 동기화된 총 복제본의 수를 2개로 설정합니다.\n",
        "\n",
        "    - 작업자 0:\n",
        "        - 배치 1 =  복제본 1: [0, 1]\n",
        "        - 배치 2 =  복제본 1: [4, 5]\n",
        "        - 배치 3 =  복제본 1: [8, 9]\n",
        "    - 작업자 1:\n",
        "        - 배치 1 =  복제본 2: [2, 3]\n",
        "        - 배치 2 =  복제본 2: [6, 7]\n",
        "        - 배치 3 =  복제본 2: [10, 11]\n",
        "\n",
        "- OFF: 자동 샤딩을 끄면 각 작업자가 모든 데이터를 처리합니다. 예를 들어, 두 작업자에 1개의 파일을 배포해 보겠습니다. 파일 1에 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]이 포함됩니다. 동기화된 총 복제본의 수를 2개로 설정합니다. 그러면 각 작업자는 다음과 같은 분산을 볼 수 있습니다.\n",
        "\n",
        "    - 작업자 0:\n",
        "\n",
        "        - 배치 1 = 복제본 1: [0, 1]\n",
        "        - 배치 2 =  복제본 1: [2, 3]\n",
        "        - 배치 3 =  복제본 1: [4, 5]\n",
        "        - 배치 4 =  복제본 1: [6, 7]\n",
        "        - 배치 5 =  복제본 1: [8, 9]\n",
        "        - 배치 6 =  복제본 1: [10, 11]\n",
        "\n",
        "    - 작업자 1:\n",
        "\n",
        "        - 배치 1 =  복제본 2: [0, 1]\n",
        "        - 배치 2 =  복제본 2: [2, 3]\n",
        "        - 배치 3 =  복제본 2: [4, 5]\n",
        "        - 배치 4 =  복제본 2: [6, 7]\n",
        "        - 배치 5 =  복제본 2: [8, 9]\n",
        "        - 배치 6 =  복제본 2: [10, 11] "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "OK46ZJGPH5H2"
      },
      "source": [
        "##### 프리페치\n",
        "\n",
        "기본적으로 `tf.distribute`는 사용자 제공 `tf.data.Dataset` 인스턴스의 끝에 프리페치 변환을 추가합니다. 프리페치 변환에 대한 인수 `buffer_size`는 동기화 중인 복제본의 수와 같습니다."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "PjiGSY3gtr6_"
      },
      "source": [
        "### `tf.distribute.Strategy.experimental_distribute_datasets_from_function`"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "bAXAo_wWbWSb"
      },
      "source": [
        "#### 사용법\n",
        "\n",
        "이 API는 입력 함수를 받고 `tf.distribute.DistributedDataset` 인스턴스를 반환합니다. 사용자가 전달하는 입력 함수는`tf.distribute.InputContext` 인수를 갖고 `tf.data.Dataset` 인스턴스를 반환해야 합니다. 이 API를 사용하면 `tf.distribute`는 더 이상 입력 함수로부터 반환된 사용자의 `tf.data.Dataset` 인스턴스를 변경하지 않습니다. 데이터세트를 배치하고 샤딩하는 것은 사용자가 해야 합니다. `tf.distribute`는 작업자 각각의 CPU 기기에서 입력 함수를 호출합니다. 사용자가 자체 배치 처리 및 샤딩 로직을 지정할 수 있게 해주는 것 이외에도, 다중 작업자 훈련을 사용할 때 `tf.distribute.Strategy.experimental_distribute_dataset`과 비교하여 확장성과 성능이 더 우수합니다."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "9ODch-OFCaW4"
      },
      "outputs": [],
      "source": [
        "mirrored_strategy = tf.distribute.MirroredStrategy()\n",
        "\n",
        "def dataset_fn(input_context):\n",
        "  batch_size = input_context.get_per_replica_batch_size(global_batch_size)\n",
        "  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)\n",
        "  dataset = dataset.shard(\n",
        "    input_context.num_input_pipelines, input_context.input_pipeline_id)\n",
        "  dataset = dataset.batch(batch_size)\n",
        "  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.\n",
        "  return dataset\n",
        "\n",
        "dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "M1bpzPYzt_R7"
      },
      "source": [
        "#### 속성"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "7cgzhwiiuBvO"
      },
      "source": [
        "##### 배치 처리\n",
        "\n",
        "입력 함수의 반환 값인 `tf.data.Dataset` 인스턴스는 복제본별 배치 크기를 사용하여 배치해야 합니다. 복제본별 배치 크기는 전역 배치 크기를 동기화 훈련에 참여하는 복제본의 수로 나눈 값입니다. `tf.distribute`가 각 작업자의 CPU 기기에서 입력 함수를 호출하기 때문입니다. 지정된 작업자에서 생성된 데이터세트는 해당 작업자의 모든 복제본에서 사용할 수 있어야 합니다. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "e-wlFFZbP33n"
      },
      "source": [
        "##### 샤딩\n",
        "\n",
        "사용자의 입력 함수에 대한 인수로 암시적으로 전달되는 `tf.distribute.InputContext` 객체는 내부에서 `tf.distribute` 에 의해 생성됩니다. 작업자 수, 현재 작업자 ID 등에 대한 정보가 있습니다.이 입력 함수는 `tf.distribute.InputContext` 오브젝트의 일부인 이러한 특성을 사용하여 사용자가 설정 한 정책에 따라 샤딩을 처리 할 수 있습니다.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "7TGwnDM-ICHf"
      },
      "source": [
        "##### 프리페치\n",
        "\n",
        "`tf.distribute` 는 사용자가 제공한 입력 함수가 반환한 `tf.data.Dataset` 의 끝에 프리페치 변환을 추가하지 않습니다."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "iOMsf8kyZZpv"
      },
      "source": [
        "참고 : `tf.distribute.Strategy.experimental_distribute_dataset` 및 `tf.distribute.Strategy.experimental_distribute_datasets_from_function` **은 `tf.data.Dataset` 유형이 아닌 `tf.distribute.DistributedDataset` 인스턴스를** 반환합니다. Distributed Iterators 섹션에 표시된 대로 이러한 인스턴스를 반복하고 `element_spec` 속성을 사용할 수 있습니다. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "dL3XbI1gzEjO"
      },
      "source": [
        "## 분산 반복기"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "w8y54-o9T2Ni"
      },
      "source": [
        "비분산 `tf.data.Dataset` 인스턴스와 유사하게, `tf.distribute.DistributedDataset`에서 요소에 접근하여 반복하려면 `tf.distribute.DistributedDataset` 인스턴스에 반복기를 생성해야 합니다. 다음은 `tf.distribute.DistributedIterator`를 생성하고 모델을 훈련할 때 사용하는 방법입니다.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "FlKh8NV0uOtZ"
      },
      "source": [
        "### 사용법"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "eSZz6EqOuSlB"
      },
      "source": [
        "#### Python 같은 루프 구성 사용하기\n",
        "\n",
        "Python 같은 사용자 친화적인 루프를 사용하여 `tf.distribute.DistributedDataset`를 반복할 수 있습니다. `tf.distribute.DistributedIterator`에서 반환된 요소는 단일 `tf.Tensor` 또는 복제본별 값을 포함하는 `tf.distribute.DistributedValues`입니다. `tf.function` 내에 루프를 배치하면 성능이 향상됩니다. 그러나 `tf.function` 내부에 배치된 `tf.distribute.DistributedDataset`를 통한 루프는 현재 `break` 및 `return`이 지원되지 않습니다. .function {/ code8}. `tf.distribute.experimental.MultiWorkerMirroredStrategy` 및 `tf.distribute.TPUStrategy`와 같은 다중 작업자 전략을 사용할 때, `tf.function` 내에 루프를 배치하는 것도 지원하지 않습니다. 단일 작업자 `tf.distribute.TPUStrategy`를 위해 <code>tf.function</code> 내에 루프를 배치하는 것은 동작하지만, TPU pod를 사용할 때는 동작하지 않습니다."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "zt3AHb46Tr3w"
      },
      "outputs": [],
      "source": [
        "global_batch_size = 16\n",
        "mirrored_strategy = tf.distribute.MirroredStrategy()\n",
        "\n",
        "dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)\n",
        "dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)\n",
        "\n",
        "@tf.function\n",
        "def train_step(inputs):\n",
        "  features, labels = inputs\n",
        "  return labels - 0.3 * features\n",
        "\n",
        "for x in dist_dataset:\n",
        "  # train_step trains the model using the dataset elements\n",
        "  loss = mirrored_strategy.run(train_step, args=(x,))\n",
        "  print(\"Loss is \", loss)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "NchPwTEiuSqb"
      },
      "source": [
        "#### `iter`를 사용하여 명시적인 반복기 만들기\n",
        "\n",
        "`tf.distribute.DistributedDataset` 인스턴스의 요소를 반복하기 위해 `iter` API를 사용하여 `tf.distribute.DistributedIterator`를 생성할 수 있습니다. 명시적인 반복기를 사용하면 고정된 수의 스텝을 반복할 수 있습니다. `tf.distribute.DistributedIterator` 인스턴스 `dist_iterator`에서 다음 요소를 가져오려면 `next(dist_iterator)`, `dist_iterator.get_next()` 또는 `dist_iterator.get_next_as_optional()`을 호출할 수 있습니다. 앞의 두 개는 본질적으로 동일합니다."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "OrMmakq5EqeQ"
      },
      "outputs": [],
      "source": [
        "num_epochs = 10\n",
        "steps_per_epoch = 5\n",
        "for epoch in range(num_epochs):\n",
        "  dist_iterator = iter(dist_dataset)\n",
        "  for step in range(steps_per_epoch):\n",
        "    # train_step trains the model using the dataset elements\n",
        "    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))\n",
        "    # which is the same as\n",
        "    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))\n",
        "    print(\"Loss is \", loss)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "UpJXIlxjqPYg"
      },
      "source": [
        "`next()` 또는 `tf.distribute.DistributedIterator.get_next()`를 사용하여 `tf.distribute.DistributedIterator`의 끝에 도달하면 OutOfRange 오류가 발생합니다. 클라이언트는 Python 측에서 오류를 포착하고 체크포인트 및 평가와 같은 다른 작업을 계속할 수 있습니다. 그러나 호스트 훈련 루프를 사용하는 경우(예: `tf.function`당 여러 스텝 실행) 다음과 같이 작동하지 않습니다.\n",
        "\n",
        "```\n",
        "@tf.function def train_fn(iterator):   for _ in tf.range(steps_per_loop):     strategy.run(step_fn, args=(next(iterator),))\n",
        "```\n",
        "\n",
        "`train_fn`은 스텝 본문을 `tf.range`안에 배치하여 여러 스텝을 래핑합니다. 이 경우 종속성이 없는 루프에서 다른 반복이 병렬로 시작될 수 있으므로 이전 반복 계산이 완료되기 전에 이후 반복에서 OutOfRange 오류가 트리거될 수 있습니다. OutOfRange 오류가 발생하면 함수의 모든 op가 즉시 종료됩니다. 이런 경우를 피하려면 OutOfRange 오류를 발생시키지 않는 대안은 `tf.distribute.DistributedIterator.get_next_as_optional()`입니다. `get_next_as_optional`은 다음 요소를 포함하거나 `tf.distribute.DistributedIterator`가 끝에 도달한 경우 값이 없는 `tf.experimental.Optional`을 반환합니다."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Iyjao96Vqwyz"
      },
      "outputs": [],
      "source": [
        "# You can break the loop with get_next_as_optional by checking if the Optional contains value\n",
        "global_batch_size = 4\n",
        "steps_per_loop = 5\n",
        "strategy = tf.distribute.MirroredStrategy(devices=[\"GPU:0\", \"CPU:0\"])\n",
        "\n",
        "dataset = tf.data.Dataset.range(9).batch(global_batch_size)\n",
        "distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))\n",
        "\n",
        "@tf.function\n",
        "def train_fn(distributed_iterator):\n",
        "  for _ in tf.range(steps_per_loop):\n",
        "    optional_data = distributed_iterator.get_next_as_optional()\n",
        "    if not optional_data.has_value():\n",
        "      break\n",
        "    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))\n",
        "    tf.print(strategy.experimental_local_results(per_replica_results))\n",
        "train_fn(distributed_iterator)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "LaclbKnqzLjf"
      },
      "source": [
        "## `element_spec` 속성 사용"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Z1YvXqOpwy08"
      },
      "source": [
        "분산된 데이터세트의 요소를 `tf.function`으로 전달하여 `tf.TypeSpec` 보장을 원할 경우, `tf.function`의 `input_signature` 인수를 지정합니다. 분산 데이터세트의 출력은 `tf.distribute.DistributedValues`이며 단일 기기 또는 여러 기기에 대한 입력을 나타낼 수 있습니다. 이 분산 값에 해당하는 `tf.TypeSpec`을 가져오려면 분산 데이터세트 또는 분산 반복기 객체의 `element_spec` 속성을 사용할 수 있습니다."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "pg3B-Cw_cn3a"
      },
      "outputs": [],
      "source": [
        "global_batch_size = 16\n",
        "epochs = 5\n",
        "steps_per_epoch = 5\n",
        "mirrored_strategy = tf.distribute.MirroredStrategy()\n",
        "\n",
        "dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)\n",
        "dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)\n",
        "\n",
        "@tf.function(input_signature=[dist_dataset.element_spec])\n",
        "def train_step(per_replica_inputs):\n",
        "  def step_fn(inputs):\n",
        "    return 2 * inputs\n",
        "  \n",
        "  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))\n",
        "\n",
        "for _ in range(epochs):\n",
        "  iterator = iter(dist_dataset)\n",
        "  for _ in range(steps_per_epoch):\n",
        "    output = train_step(next(iterator))\n",
        "    tf.print(output)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "-OAa6svUzuWm"
      },
      "source": [
        "## 부분 배치"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "hW2_gVkiztUG"
      },
      "source": [
        "사용자가 생성한 `tf.data.Dataset` 인스턴스에 복제본의 수로 균등하게 나눌 수 없는 배치 크기가 포함되어 있거나 데이터세트 인스턴스의 카디널리티가 배치 크기로 나눌 수 없는 경우 부분 배치가 발생합니다. 이는 데이터세트가 여러 복제본에 분산될 때 일부 반복기에 대한 `next` 호출로 OutOfRangeError가 발생함을 의미합니다. 이 사용 사례를 처리하기 위해 `tf.distribute`는 처리할 데이터가 더 이상 없는 복제본에서 배치 크기가 0인 더미 배치를 반환합니다.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "rqutdpqtPcCH"
      },
      "source": [
        "단일 작업자 사례의 경우 반복기에서 `next` 호출로 데이터가 반환되지 않으면 배치 크기가 0인 더미 배치가 작성되어 데이터세트의 실제 데이터와 함께 사용됩니다. 부분 배치의 경우 마지막 전역 배치 데이터에는 더미 배치 데이터와 함께 실제 데이터가 포함됩니다. 데이터 처리를 위한 중지 조건이 이제 복제본에 데이터가 있는지 확인합니다. 복제본에 데이터가 없으면 OutOfRange 오류가 발생합니다.\n",
        "\n",
        "다중 작업자 사례의 경우, 각 작업자에서 데이터의 존재를 나타내는 boolean 값은 교차 복제본 통신을 사용하여 집계하며, 이는 모든 작업자가 분산 데이터세트의 처리를 완료했는지 식별하는 데 사용됩니다. 여기에는 작업자 간 의사소통이 포함되므로 성능에 약간의 불이익이 따릅니다.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "vehLsgljz90Y"
      },
      "source": [
        "## 경고 사항"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Nx4jyN_Az-Dy"
      },
      "source": [
        "- 다중 작업자 설정에서  `tf.distribute.Strategy.experimental_distribute_dataset` API를 사용할 때, 사용자는 파일로부터 읽은 `tf.data.Dataset`을 전달합니다. <code>tf.data.experimental.AutoShardPolicy</code>가 <code>AUTO</code> 또는 <code>FILE</code>로 설정된 경우, 스텝당 실제 배치 사이즈는 사용자가 정의한 전역 배치 크기보다 더 작을 수 있습니다. 파일에 남아 있는 요소가 전역 배치 크기보다 더 적을 때 이런 경우가 발생할 수 있습니다. 사용자는 실행 스텝의 수에 의존하지 않고 데이터세트를 소진하거나 <code>tf.data.experimental.AutoShardPolicy</code>를 <code>DATA</code>로 설정하여 문제를 해결할 수 있습니다.\n",
        "\n",
        "- 상태 저장 데이터세트 변환은 현재 `tf.distribute`에서 지원되지 않으며 데이터세트에 있을 수 있는 상태 저장 연산은 현재 무시됩니다. 예를 들어, 데이터세트에 `map_fn`을 사용하여 이미지를 회전시키는 `tf.random.uniform`이 있는 경우, Python 프로세스가 실행되는 로컬 머신의 상태(예: 임의 시드)에 의존하는 데이터세트 그래프가 있습니다.\n",
        "\n",
        "- `tf.distribute`와 함께 사용될 때와 같이, 특정 컨텍스트에서 기본적으로 비활성화되어 있는 실험적인 `tf.data.experimental.OptimizationOptions`은 성능 저하를 유발합니다. 배포 설정에서 워크로드의 성능 향상이 확인된 후에만 이 옵션을 활성하세요.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "dAC_vRmJyzrB"
      },
      "source": [
        "- `tf.distribute.experimental_distribute_dataset` 또는 `tf.distribute.experimental_distribute_datasets_from_function`을 사용할 때 작업자에 의해 처리되는 데이터의 순서는 보장되지 않습니다. 일반적으로 `tf.distribute`을 사용하여 예측을 조정하는 경우 요구됩니다. 하지만 배치의 각 요소에 인덱스를 삽입하고 그에 맞게 출력을 정렬하면 됩니다. 다음은 출력을 정렬하는 방법에 대한 예제 코드 조각입니다.\n",
        "\n",
        "참고: 편의상 `tf.distribute.MirroredStrategy()`를 사용합니다. 여러 작업자를 사용하고 `tf.distribute.MirroredStrategy`를 사용하여 단일 작업자에게 훈련을 배포하는 경우에만 입력 순서를 다시 지정하면 됩니다."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Zr2xAy-uZZaL"
      },
      "outputs": [],
      "source": [
        "mirrored_strategy = tf.distribute.MirroredStrategy()\n",
        "dataset_size = 24\n",
        "batch_size = 6\n",
        "dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)\n",
        "dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)\n",
        "\n",
        "def predict(index, inputs):\n",
        "  outputs = 2 * inputs\n",
        "  return index, outputs\n",
        "\n",
        "result = {}\n",
        "for index, inputs in dist_dataset:\n",
        "  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))\n",
        "  indices = list(mirrored_strategy.experimental_local_results(output_index))\n",
        "  rindices = []\n",
        "  for a in indices:\n",
        "    rindices.extend(a.numpy())\n",
        "  outputs = list(mirrored_strategy.experimental_local_results(outputs))\n",
        "  routputs = []\n",
        "  for a in outputs:\n",
        "    routputs.extend(a.numpy())\n",
        "  for i, value in zip(rindices, routputs):\n",
        "    result[i] = value\n",
        "\n",
        "print(result)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "nNbn7HXx0YqB"
      },
      "source": [
        "<a name=\"tensorinputs\"> # 표준 tf.data.Dataset 인스턴스를 사용하지 않는 경우 데이터를 어떻게 배포하나요? </a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "dymZixqo0nKK"
      },
      "source": [
        "때때로 사용자가 `tf.data.Dataset`을 사용하여 입력을 나타내고 이후에 언급한 API를 사용하여 데이터 세트를 여러 기기에 분배할 수 없습니다. 이런 경우 생성기의 원시 텐서 또는 입력을 사용할 수 있습니다.\n",
        "\n",
        "### 임의의 텐서 입력에 experiment_distribute_values_from_function 사용하기\n",
        "\n",
        "`strategy.run`은 `next(iterator)`의 출력인 `tf.distribute.DistributedValues`를 허용합니다. 텐서 값을 전달하려면 `experimental_distribute_values_from_function`을 사용하여 원시 텐서에서 `tf.distribute.DistributedValues`를 구성합니다."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "ajZHNRQs0kqm"
      },
      "outputs": [],
      "source": [
        "mirrored_strategy = tf.distribute.MirroredStrategy()\n",
        "worker_devices = mirrored_strategy.extended.worker_devices\n",
        "\n",
        "def value_fn(ctx):\n",
        "  return tf.constant(1.0)\n",
        "\n",
        "distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)\n",
        "for _ in range(4):\n",
        "  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))\n",
        "  print(result)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "P98aFQGf0x_7"
      },
      "source": [
        "### 생성기에서 입력한 경우 tf.data.Dataset.from_generator 사용하기"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "emZCWQSi04qT"
      },
      "source": [
        "사용하려는 생성기 함수가 있는 경우, `from_generator` API를 사용하여 `tf.data.Dataset` 인스턴스를 생성할 수 있습니다.\n",
        "\n",
        "참고: 현재 `tf.distribute.TPUStrategy`에서는 지원하지 않습니다."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "jRhU0X230787"
      },
      "outputs": [],
      "source": [
        "mirrored_strategy = tf.distribute.MirroredStrategy()\n",
        "def input_gen():\n",
        "  while True:\n",
        "    yield np.random.rand(4)\n",
        "\n",
        "# use Dataset.from_generator\n",
        "dataset = tf.data.Dataset.from_generator(\n",
        "    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))\n",
        "dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)\n",
        "iterator = iter(dist_dataset)\n",
        "for _ in range(4):\n",
        "  mirrored_strategy.run(lambda x:x, args=(next(iterator),))"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [],
      "name": "input.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
